package com.lhf.azkaban.springbatch.example.job.dao;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.repository.dao.JdbcStepExecutionDao;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.support.PatternMatcher;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer;
import org.springframework.util.Assert;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

/**
 * @Author:95780
 * @Date: 18:12 2019/11/25
 * @Description: 任务执行步骤数据库操作类
 */
public class MyJdbcStepExecutionDao extends JdbcStepExecutionDao {
    private static final String STEP_EXECUTIONS_FOR_JOB = "SELECT distinct STEP_NAME from BATCH_STEP_EXECUTION S, BATCH_JOB_EXECUTION E, BATCH_JOB_INSTANCE I "
            + "where S.JOB_EXECUTION_ID = E.JOB_EXECUTION_ID AND E.JOB_INSTANCE_ID = I.JOB_INSTANCE_ID AND I.JOB_NAME = ?";

    private static final String COUNT_STEP_EXECUTIONS_FOR_STEP = "SELECT COUNT(STEP_EXECUTION_ID) from BATCH_STEP_EXECUTION S, BATCH_JOB_EXECUTION E, BATCH_JOB_INSTANCE I "
            + "where S.JOB_EXECUTION_ID = E.JOB_EXECUTION_ID AND E.JOB_INSTANCE_ID = I.JOB_INSTANCE_ID AND I.JOB_NAME = ? AND S.STEP_NAME = ?";

    private static final String COUNT_STEP_EXECUTIONS_FOR_STEP_PATTERN = "SELECT COUNT(STEP_EXECUTION_ID) from BATCH_STEP_EXECUTION S, BATCH_JOB_EXECUTION E, BATCH_JOB_INSTANCE I"
            + " where S.JOB_EXECUTION_ID = E.JOB_EXECUTION_ID AND E.JOB_INSTANCE_ID = I.JOB_INSTANCE_ID AND I.JOB_NAME = ? AND S.STEP_NAME like ?";

    private static final String FIELDS = "S.STEP_EXECUTION_ID, S.STEP_NAME, S.START_TIME, S.END_TIME, S.STATUS, S.COMMIT_COUNT,"
            + " S.READ_COUNT, S.FILTER_COUNT, S.WRITE_COUNT, S.EXIT_CODE, S.EXIT_MESSAGE, S.READ_SKIP_COUNT, S.WRITE_SKIP_COUNT,"
            + " S.PROCESS_SKIP_COUNT, S.ROLLBACK_COUNT, S.LAST_UPDATED, S.VERSION";

    private DataSource dataSource;

    /**
     * @param dataSource the dataSource to set
     */
    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     *
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {

        Assert.state(dataSource != null, "DataSource must be provided");

        if (getJdbcTemplate() == null) {
            setJdbcTemplate(new JdbcTemplate(dataSource));
        }
        setStepExecutionIncrementer(new AbstractDataFieldMaxValueIncrementer() {
            @Override
            protected long getNextKey() {
                return 0;
            }
        });

        super.afterPropertiesSet();

    }

    public Collection<String> findStepNamesForJobExecution(String jobName, String excludesPattern) {

        List<String> list = getJdbcTemplate().query(getQuery(STEP_EXECUTIONS_FOR_JOB), new RowMapper<String>() {
            @Override
            public String mapRow(ResultSet rs, int rowNum) throws SQLException {
                return rs.getString(1);
            }
        }, jobName);

        Set<String> stepNames = new LinkedHashSet<String>(list);
        for (Iterator<String> iterator = stepNames.iterator(); iterator.hasNext();) {
            String name = iterator.next();
            if (PatternMatcher.match(excludesPattern, name)) {
                iterator.remove();
            }
        }

        return stepNames;

    }

    public Collection<StepExecution> getStepExecutionsByExecutionId(long executionId){
        String sql = "select "+FIELDS+" from BATCH_STEP_EXECUTION S where S.JOB_EXECUTION_ID=?;";
        Collection<StepExecution> stepExecutions = null;
        try {
            stepExecutions = getJdbcTemplate().query(getQuery(sql), new StepExecutionRowMapper(), executionId);
        }
        catch (IncorrectResultSizeDataAccessException e) {
            stepExecutions = Collections.emptyList();
        }
        return stepExecutions;
    }

    public Collection<StepExecution> findStepExecutions(String jobName, String stepName, int start, int count) {

        String whereClause;

        if (jobName.contains("*")) {
            whereClause = "JOB_NAME like ?";
            jobName = jobName.replace("*", "%");
        }
        else {
            whereClause = "JOB_NAME = ?";
        }

        if (stepName.contains("*")) {
            whereClause = whereClause + " AND STEP_NAME like ?";
            stepName = stepName.replace("*", "%");
        }
        else {
            whereClause = whereClause + " AND STEP_NAME = ?";
        }

        PagingQueryProvider queryProvider = getPagingQueryProvider(whereClause);

        List<StepExecution> stepExecutions;
        if (start <= 0) {
            stepExecutions = getJdbcTemplate().query(queryProvider.generateFirstPageQuery(count),
                    new StepExecutionRowMapper(), jobName, stepName);
        }
        else {
            try {
                Long startAfterValue = getJdbcTemplate().queryForObject(
                        queryProvider.generateJumpToItemQuery(start, count), Long.class, jobName, stepName);
                stepExecutions = getJdbcTemplate().query(queryProvider.generateRemainingPagesQuery(count),
                        new StepExecutionRowMapper(), jobName, stepName, startAfterValue);
            }
            catch (IncorrectResultSizeDataAccessException e) {
                return Collections.emptyList();
            }
        }

        return stepExecutions;

    }

    public int countStepExecutions(String jobName, String stepName) {
        if (stepName.contains("*")) {
            return getJdbcTemplate().queryForObject(getQuery(COUNT_STEP_EXECUTIONS_FOR_STEP_PATTERN), Integer.class, jobName,
                    stepName.replace("*", "%"));
        }
        return getJdbcTemplate().queryForObject(getQuery(COUNT_STEP_EXECUTIONS_FOR_STEP), Integer.class, jobName, stepName);
    }

    /**
     * @return a {@link PagingQueryProvider} with a where clause to narrow the
     * query
     * @throws Exception
     */
    private PagingQueryProvider getPagingQueryProvider(String whereClause) {
        SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
        factory.setDataSource(dataSource);
        factory.setFromClause(getQuery("BATCH_STEP_EXECUTION S, BATCH_JOB_EXECUTION J, BATCH_JOB_INSTANCE I"));
        factory.setSelectClause(FIELDS);
        Map<String, Order> sortKeys = new HashMap<String, Order>();
        sortKeys.put("STEP_EXECUTION_ID", Order.DESCENDING);
        factory.setSortKeys(sortKeys);
        if (whereClause != null) {
            factory.setWhereClause(whereClause
                    + " AND S.JOB_EXECUTION_ID = J.JOB_EXECUTION_ID AND J.JOB_INSTANCE_ID = I.JOB_INSTANCE_ID");
        }
        try {
            return (PagingQueryProvider) factory.getObject();
        }
        catch (Exception e) {
            throw new IllegalStateException("Unexpected exception creating paging query provide", e);
        }
    }

    private static class StepExecutionRowMapper implements RowMapper<StepExecution> {

        @Override
        public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
            StepExecution stepExecution = new StepExecution(rs.getString(2), null);
            stepExecution.setId(rs.getLong(1));
            stepExecution.setStartTime(rs.getTimestamp(3));
            stepExecution.setEndTime(rs.getTimestamp(4));
            stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5)));
            stepExecution.setCommitCount(rs.getInt(6));
            stepExecution.setReadCount(rs.getInt(7));
            stepExecution.setFilterCount(rs.getInt(8));
            stepExecution.setWriteCount(rs.getInt(9));
            stepExecution.setExitStatus(new ExitStatus(rs.getString(10), rs.getString(11)));
            stepExecution.setReadSkipCount(rs.getInt(12));
            stepExecution.setWriteSkipCount(rs.getInt(13));
            stepExecution.setProcessSkipCount(rs.getInt(14));
            stepExecution.setRollbackCount(rs.getInt(15));
            stepExecution.setLastUpdated(rs.getTimestamp(16));
            stepExecution.setVersion(rs.getInt(17));
            return stepExecution;
        }

    }
}
